0150ea9d34337ea80ee87e1e8cc081f65ae89f83,graylog2-shared/src/main/java/org/graylog2/shared/journal/KafkaJournal.java,LogRetentionCleaner,cleanupSegmentsToMaintainSize,#Log#,714
Before Change
return 0;
}
final long[] diff = {kafkaLog.size() - retentionSize};
return kafkaLog.deleteOldSegments(new AbstractFunction1<LogSegment, Object>() { // sigh scala
@Override
public Object apply(LogSegment segment) {
if (diff[0] - segment.size() >= 0) {
diff[0] -= segment.size();
loggerForCleaner.debug(
"[cleanup-size] Removing segment starting at offset {}, size {} bytes, to shrink log to new size {}, target size {}",
segment.baseOffset(),
segment.size(),
diff[0],
retentionSize);
return true;
} else {
return false;
}
}
});
}
private int cleanupSegmentsToRemoveCommitted(Log kafkaLog) {
After Change
return 0;
}
final long[] diff = {currentSize - retentionSize};
int deletedSegments = kafkaLog.deleteOldSegments(new AbstractFunction1<LogSegment, Object>() { // sigh scala
@Override
public Object apply(LogSegment segment) {
if (diff[0] - segment.size() >= 0) {
diff[0] -= segment.size();
loggerForCleaner.debug(
"[cleanup-size] Removing segment starting at offset {}, size {} bytes, to shrink log to new size {}, target size {}",
segment.baseOffset(),
segment.size(),
diff[0],
retentionSize);
return true;
} else {
return false;
}
}
});
KafkaJournal.this.purgedSegmentsInLastRetention.set(deletedSegments);
return deletedSegments;
}
private int cleanupSegmentsToRemoveCommitted(Log kafkaLog) {